agentmux_srv\backend\process_tracker/
registry.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `AgentProcessRegistry` — app-wide map from block_id → tracker handle.
5//!
6//! Created once at host startup, passed into each `SubprocessController`
7//! / `PersistentSubprocessController` instance so it can wrap spawns
8//! in its per-block job. Polled periodically from a background task
9//! that emits `agent:process-added` / `agent:process-exited` events to
10//! the frontend's swarm activity panel.
11//!
12//! Centralizing this here (rather than one tracker per controller)
13//! means the lifetime of the tracker matches the lifetime of the pane
14//! — multiple turns on the same block share the same job, so
15//! descendants from turn N are still visible on turn N+1.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::{Arc, OnceLock};
19
20use parking_lot::Mutex;
21
22use super::{new_tracker, TrackedProcess, TrackerHandle, TrackingConfidence};
23use crate::backend::wps;
24
25/// Host-wide registry, set once at startup. Exposed as a global so
26/// `SubprocessController` / `PersistentSubprocessController` can reach
27/// it without threading an `Arc` through every constructor + test site.
28/// Tests that don't initialize it see `None` and silently skip
29/// tracker registration — the job-object spawn path no-ops cleanly.
30static GLOBAL: OnceLock<Arc<AgentProcessRegistry>> = OnceLock::new();
31
32pub fn set_global(registry: Arc<AgentProcessRegistry>) {
33    let _ = GLOBAL.set(registry);
34}
35
36pub fn global() -> Option<Arc<AgentProcessRegistry>> {
37    GLOBAL.get().cloned()
38}
39
40pub struct AgentProcessRegistry {
41    inner: Mutex<HashMap<String, RegistryEntry>>,
42    broker: Option<Arc<wps::Broker>>,
43}
44
45struct RegistryEntry {
46    tracker: Arc<dyn TrackerHandle>,
47    /// Last-known PID set from the most recent poll. Used to diff
48    /// against the current set so we only emit events for
49    /// additions/removals — not on every poll tick.
50    last_pids: HashSet<u32>,
51}
52
53impl AgentProcessRegistry {
54    pub fn new(broker: Option<Arc<wps::Broker>>) -> Self {
55        Self {
56            inner: Mutex::new(HashMap::new()),
57            broker,
58        }
59    }
60
61    /// Ensure a tracker exists for this block. Idempotent — calling
62    /// twice for the same block returns the existing tracker so the
63    /// job survives controller re-creation (e.g. on /clear).
64    pub fn ensure_tracker(&self, block_id: &str) -> Arc<dyn TrackerHandle> {
65        let mut map = self.inner.lock();
66        if let Some(entry) = map.get(block_id) {
67            return entry.tracker.clone();
68        }
69        let tracker = new_tracker(block_id);
70        map.insert(
71            block_id.to_string(),
72            RegistryEntry {
73                tracker: tracker.clone(),
74                last_pids: HashSet::new(),
75            },
76        );
77        tracing::info!(
78            block_id = %block_id,
79            confidence = ?tracker.confidence(),
80            "[process-tracker] registered tracker"
81        );
82        tracker
83    }
84
85    /// Drop a block's tracker — call when the pane closes. The tracker's
86    /// Drop impl kills the whole process tree (via `KILL_ON_JOB_CLOSE`
87    /// on Windows, `cgroup.kill` on Linux, `killpg` on macOS).
88    pub fn remove(&self, block_id: &str) {
89        let mut map = self.inner.lock();
90        if map.remove(block_id).is_some() {
91            tracing::info!(block_id = %block_id, "[process-tracker] dropped tracker on pane close");
92        }
93    }
94
95    /// Current members of a block's tracker, for the RPC endpoint.
96    pub fn list_block(&self, block_id: &str) -> Vec<TrackedProcess> {
97        self.inner
98            .lock()
99            .get(block_id)
100            .map(|e| e.tracker.list_members())
101            .unwrap_or_default()
102    }
103
104    /// Confidence of a block's tracker — drives the "tracking is
105    /// best-effort on macOS" badge in the swarm UI.
106    pub fn confidence_of(&self, block_id: &str) -> TrackingConfidence {
107        self.inner
108            .lock()
109            .get(block_id)
110            .map(|e| e.tracker.confidence())
111            .unwrap_or(TrackingConfidence::None)
112    }
113
114    /// All tracked blocks — used by the swarm panel's aggregate view.
115    pub fn list_all_blocks(&self) -> Vec<String> {
116        self.inner.lock().keys().cloned().collect()
117    }
118
119    /// Kill the entire process tree for a given block. Returns `true`
120    /// if a tracker was found (the kill was dispatched). Does NOT
121    /// synchronously wait for descendants to actually exit — the
122    /// poller's next tick will pick up the state changes and emit
123    /// `agent:process-exited` events the frontend can react to.
124    pub fn kill_tree(&self, block_id: &str) -> bool {
125        let tracker = self.inner.lock().get(block_id).map(|e| e.tracker.clone());
126        match tracker {
127            Some(t) => {
128                t.kill_tree();
129                true
130            }
131            None => false,
132        }
133    }
134
135    /// Kill a single PID if it's a member of the given block's tree.
136    /// Returns `true` if the tracker was found and the PID matched.
137    pub fn kill_pid(&self, block_id: &str, pid: u32) -> bool {
138        let tracker = self.inner.lock().get(block_id).map(|e| e.tracker.clone());
139        match tracker {
140            Some(t) => t.kill_pid(pid),
141            None => false,
142        }
143    }
144
145    /// Poll every tracked block's membership and diff against the
146    /// last-known set. Emits `agent:process-added` / `-exited` events
147    /// for each delta.
148    ///
149    /// Called by a background Tokio task on a ~2s interval.
150    pub fn poll_and_emit(&self) {
151        let mut map = self.inner.lock();
152        for (block_id, entry) in map.iter_mut() {
153            let current_members = entry.tracker.list_members();
154            let current_pids: HashSet<u32> = current_members.iter().map(|p| p.pid).collect();
155
156            for added_pid in current_pids.difference(&entry.last_pids) {
157                if let Some(p) = current_members.iter().find(|p| p.pid == *added_pid) {
158                    self.emit(
159                        "agent:process-added",
160                        block_id,
161                        serde_json::json!({ "block_id": block_id, "process": p }),
162                    );
163                }
164            }
165            for removed_pid in entry.last_pids.difference(&current_pids) {
166                self.emit(
167                    "agent:process-exited",
168                    block_id,
169                    serde_json::json!({ "block_id": block_id, "pid": removed_pid }),
170                );
171            }
172
173            entry.last_pids = current_pids;
174        }
175    }
176
177    fn emit(&self, event_name: &str, block_id: &str, data: serde_json::Value) {
178        let Some(ref broker) = self.broker else { return };
179        broker.publish(wps::WaveEvent {
180            event: event_name.to_string(),
181            scopes: vec![format!("block:{}", block_id)],
182            sender: String::new(),
183            persist: 0,
184            data: Some(data),
185        });
186    }
187}
188
189/// Spawn the polling task. Drops when the registry's `Arc` refcount
190/// hits zero (host shutdown). ~2s cadence balances latency (new
191/// processes show up fast) with CPU overhead (job queries are cheap
192/// but not free).
193pub fn spawn_poller(registry: Arc<AgentProcessRegistry>) {
194    tokio::spawn(async move {
195        let mut interval = tokio::time::interval(std::time::Duration::from_secs(2));
196        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
197        loop {
198            interval.tick().await;
199            registry.poll_and_emit();
200        }
201    });
202}